<?php

//设置
defined('REDISES_DEBUG') or define('REDISES_DEBUG', true); //是否显示错误
defined('SERVER_STATUS_CACHE_EXPIRE') or define('SERVER_STATUS_CACHE_EXPIRE', 1800); //服务器宕机记录缓存过期时间
defined('TIMEOUT_SHORT') or define('TIMEOUT_SHORT', 0.2); //redis服务器连接超时时间，秒为单位
defined('TIMEOUT_LONG') or define('TIMEOUT_LONG', 0.5); //redis服务器首次连接不上，二次重连超时时间，秒为单位
//以下常量不要更改
define('SERVER_STATUS_CACHE_KEY', 'Redises.ServerStatus'); //服务器宕机记录缓存名称
define('REDIS_UPDATE', 1); //更新服务器宕机记录缓存
define('REDIS_RETRY', 2); //重新检测服务器宕机情况
define('REDIS_BALAN', 1); //均衡分布(hash分布)
define('REDIS_RAND', 2); //随机分布

/**
 * Redises  redis集群，带宕机移除机制
 * RedisQ   redis队列，随机分布
 * RedisX   redis值存储，哈希分布，支持数组序列化
 *
 * @author 吴啸
 * @package Redises
 * @example
 * $redis = new Redises();//默认hash分布
 * or
 * $redis = new Redises(REDIS_RAND);//可以定义为随机分布
 */
class Redises {

    /**
     * 负载模式可选项，1：均衡分布式，2：随机分布
     * @var array
     */
    private static $_mode = array(1, 2);

    /**
     * 当前负载模式
     * @var int
     */
    public $mode;

    /**
     * 服务器池
     * @var array
     */
    private $_serverPool = array(
            //'127.0.0.1:6379'=>true,
            //'192.168.0.182'=>true,
    );

    /**
     * 连接池
     * @var array
     */
    private $_linkPool = array(
            //'127.0.0.1:6379'=>null,
            //'192.168.0.182'=>Object Redis,
    );

    /**
     * 服务器检测到宕机时间记录
     * @var array
     */
    private $_downTime = array(
            //'127.0.0.1:6379'=>1393592722,
            //'192.168.0.182'=>1393690743,
    );
    
    /**
     * 连接池内的服务器是否已全部建立连接
     * @var bool
     */
    private $_allConnected;
    
    /**
     * 一致性哈希实例
     * @var object
     * @instanceof RedisConsistentHashing
     */
    private $_hashInstant;

    /**
     * 动态随机实例
     * @var object
     * @instanceof RedisDynamicRandom
     */
    private $_randInstant;

    /**
     * 最后访问的redis连接
     * @var object
     * @instanceof Redis
     */
    protected $_lastLink;

    /**
     * 最后访问的redis服务器
     * @var string
     * @example '127.0.0.1:6379'
     */
    protected $_lastHost;

    /**
     * cache持久件
     * @var null/object
     * @instanceof Redis/SplFileInfo
     */
    private $_cacheObject = null;

    /**
     * 任何未定义的方法都指派到redis基类
     * @param string $method_name
     * @param array $params
     * @return mixed
     */
    public function __call($method_name, $params) {
        $key = !empty($params[0]) ? $params[0] : '';
        $redis = $this->getRedis($key);
        if ($redis instanceof void){
            throw new RedisesException("Empty server list or all downtime");
            return false;
        }elseif(!method_exists($redis, $method_name)) {
            throw new RedisesException("Unable to call {$method_name}()");
            return false;
        }
        return call_user_method_array($method_name, $this->getRedis($key), $params);
    }

    /**
     * 构造函数
     * @param int $mode 负载均衡模式，默认为均衡分布
     * @comment 首先确定缓存模式，然后获取redis服务器宕机记录缓存
     */
    public function __construct($mode = REDIS_BALAN) {
        //redis扩展也是基于php 的socket方式实现，因此socket访问不超时
        ini_set('default_socket_timeout', -1);

        $this->_whereToCache();
        if ($_downTime = $this->downTime())
            $this->_downTime = $_downTime;
        //var_dump($this->_downTime);
        $this->mode = $mode;

        register_shutdown_function(array($this, 'downTime'), REDIS_UPDATE);
    }

    /**
     * 析构函数
     * @comment 首先关闭所有已打开的redis连接，然后把此次获取的服务器宕机记录缓存起来
     */
    public function __destruct() {
        foreach ($this->_linkPool as $key => $value) {
            if (is_object($this->_linkPool[$key]))
                $this->_linkPool[$key]->close();
        }
        //$this->downTime(REDIS_UPDATE);
    }

    /**
     * 添加服务器
     * @param string $host ip地址
     * @param int $port 端口
     * @return bool
     * @example $redis->addserver('192.168.0.182');
     */
    public function addserver($host, $port = 6379) {
        //加入服务器池
        $this->_serverPool["$host:$port"] = true;
        
        //如果服务器不存在宕机记录缓存中，则默认可用，加入连接池
        if (empty($this->_downTime["$host:$port"])) {
            $this->_linkPool["$host:$port"] = null;
        }//如果服务器有宕机记录，则检查上次宕机时间确定是否重新尝试连接
        elseif (( $this->_downTime["$host:$port"] + SERVER_STATUS_CACHE_EXPIRE) < time()) {
            $this->_linkPool["$host:$port"] = null;
        }
        return isset($this->_linkPool["$host:$port"]);
    }

    /**
     * 返回最后分配的redis服务器
     * @return string/null
     * @example 
     * $redis->lastserver();
     */
    public function lastserver() {
        return $this->_lastHost;
    }

    /**
     * 返回最后分配的redis连接
     * @return object/null
     * @instanceof Redis
     * @example 
     * $redis->lastlink();
     */
    public function lastlink() {
        return $this->_lastLink;
    }

    /**
     * 返回所有服务器状态
     * @return array
     * array (
     *  '192.168.0.180:6379' => 'Down at 2014-02-20 16:56:56',
     *  '192.168.0.181:6379' => 'Connected',
     *  '192.168.0.182:6379' => 'Not connected',
     * )
     * @example $redis->dumpserver();
     */
    public function dumpserver() {
        $_serverPool = array();
        foreach ($this->_serverPool as $host_port => $status) {
            if (!empty($this->_downTime[$host_port])) {
                $_serverPool[$host_port] = 'Down at ' . date('Y-m-d H:i:s', $this->_downTime[$host_port]);
            } elseif (isset($this->_linkPool[$host_port]) && is_object($this->_linkPool[$host_port])) {
                $_serverPool[$host_port] = 'Connected';
            } else {
                $_serverPool[$host_port] = 'Not connected';
            }
        }
        return $_serverPool;
    }

    /**
     * 返回关于 Redis 服务器的各种信息和统计值
     * @param string $host ip地址
     * @return array
     * @example 
     * $redis->info('192.168.0.182');
     * or
     * $redis->info('192.168.0.182:6380');
     */
    public function info($host = null) {
        if (!$host)
            return is_object($this->_lastLink) ? $this->_lastLink->info() : false;
        return $this->connect($host)->info();
    }

    /**
     * 查找符合给定模式的key
     * @param string $host ip地址
     * @param string $pattern 模式字串
     * @return array
     * @example 
     * $redis->info('192.168.0.182','*');
     * or
     * $redis->info('192.168.0.182:6380','list_*');
     */
    public function keys($host = null, $pattern = '*') {
        if (!$host)
            return is_object($this->_lastLink) ? $this->_lastLink->keys($pattern) : false;
        return $this->connect($host)->keys($pattern);
    }

    //创建连接，加入连接池
    protected function connect($host, $port = 6379) {
        //var_dump($host);
        if ($pos = strpos($host, ':')) {
            $port = (int) substr($host, $pos + 1);
            $host = substr($host, 0, $pos);
        }
        //var_dump($host,$port);
        //如果不在服务器池中，直接返回连接
        if (!isset($this->_serverPool["$host:$port"])) {
            return $this->_connect($host, $port);
        }

        //如果连接失败，在宕机记录中记录宕机时间
        if (!$link = $this->_connect($host, $port)) {
            $this->_downTime["$host:$port"] = time();
            $this->downTime(REDIS_UPDATE);
            return false;
        }
        //如果有宕机记录但连接成功了，将宕机记录去掉
        elseif (isset($this->_downTime["$host:$port"])){
            unset($this->_downTime["$host:$port"]);
            $this->downTime(REDIS_UPDATE);
        }

        $this->_lastHost = "$host:$port";
        $this->_lastLink = &$link;
        return $link;
    }

    //创建连接
    private function _connect($host, $port) {
        if (!empty($this->_linkPool["$host:$port"])) {
            //如果已经建立连接
            $link = $this->_linkPool["$host:$port"];
        } else {
            $link = new Redis();
            try {
                $r = $link->connect($host, $port, TIMEOUT_SHORT);
            } catch (Exception $exc) {}

            if (!isset($r) || !$r) {
                try {
                    $r = $this->_cacheObject
                            ?$link->connect($host, $port, TIMEOUT_LONG)
                            :$link->connect($host, $port, TIMEOUT_SHORT);
                } catch (Exception $exc) {}
            }

            if (!isset($r) || !$r)
                $link = false;
            
            if (is_object($link))
                //如果连接成功，将对象放入连接池
                $this->_linkPool["$host:$port"] = &$link;
            else
                //如果连接失败，从连接池中去除
                unset($this->_linkPool["$host:$port"]);
            
            //检查连接池内的服务器是否已全部建立连接
            $this->allConnected(REDIS_UPDATE);
        }
        
        return $link;
    }

    //根据
    protected function getRedis($key = null) {
        $mode = in_array($this->mode, self::$_mode) ? $this->mode : 1;
        if ($mode == 1) {
            return $this->hashRedis($key);
        } elseif ($mode == 2) {
            return $this->randRedis();
        }
    }

    /**
     * 获得redis Resources，根据key名用hash分配
     * @param string $key redis存的key/或随机值
     * @return object
     * @instanceof Redis
     */
    protected function hashRedis($key = null) {
        empty($key) ? $key = '' : '';
        $this->downTime(REDIS_RETRY);//重新检测服务器宕机情况

        //建立一致性哈希圆环
        if (!(@$this->_hashInstant instanceof RedisConsistentHashing)) {
            $this->_hashInstant = new RedisConsistentHashing();
            foreach ($this->_linkPool as $host_port => $obj) {
                $this->_hashInstant->addNode($host_port);
            }
        }

        do {
            $host_port = $this->_hashInstant->getOneNode($key);
            //var_dump($host_port);
            $link = $this->connect($host_port);
            if (!$link)
                $this->_hashInstant->removeNode($host_port);
        } while (!$link && !empty($this->_linkPool));
        return $link ? $link : new void();
    }

    /**
     * 获得redis Resources，随机分配
     * @return object
     * @instanceof Redis
     */
    protected function randRedis(array $_elementPool = array()) {
        $this->downTime(REDIS_RETRY);//重新检测服务器宕机情况
        
        //建立动态随机
        if (!(@$this->_randInstant instanceof RedisDynamicRandom)) {
            $this->_randInstant = new RedisDynamicRandom();
        }
        //var_dump($_elementPool);
        foreach ($_elementPool as $element => $num) {
            $this->_randInstant->setNum($element, $num);
        }
        //var_dump($this->_randInstant->dump());

        do {
            //$host_port = array_rand($this->_linkPool);
            $host_port = $this->_randInstant->rand(array_keys($this->_linkPool));
            //var_dump($host_port);
            $link = $this->connect($host_port);
            //var_dump($this->_linkPool,$host_port);
        } while (!$link && !empty($this->_linkPool));
        return $link ? $link : new void();
    }

    /**
     * 决定cache模式
     * 首先检测本地是否存在redis服务器，是的话用其缓存服务器宕机记录，否则用文件缓存
     * 文件缓存首先检测系统临时文件目录(linux:/tmp/,windows:c:\windows\temp\)是否可写，否则用类文件所在目录
     * @return bool
     */
    private function _whereToCache() {
        //检测redis
        $redis = new Redis();
        try {
            if ($redis->connect('127.0.0.1', 6379, 0.005)) {
                $this->_cacheObject = $redis;
                return true;
            }
        } catch (Exception $exc) {}

        //检测系统临时目录
        $directory = sys_get_temp_dir() . DIRECTORY_SEPARATOR;
        if (is_writable($directory)) {
            $this->_cacheObject = new SplFileInfo($directory . SERVER_STATUS_CACHE_KEY);
            return true;
        }

        //检测类文件所在目录
        $directory = realpath(dirname(__FILE__)) . DIRECTORY_SEPARATOR;
        if (!is_writable($directory) && !chmod($directory, 0777)) {
            return false;
        }
        if (is_writable($directory)) {
            $this->_cacheObject = new SplFileInfo($directory . SERVER_STATUS_CACHE_KEY);
            return true;
        }

        return false;
    }

     /**
     * 检查/告知连接池内的服务器是否已全部建立连接
     * @param int $action 行为指向
     * @return void/bool
     */
    public function allConnected($action = null){
        if ($action === 1) {
            //检查是否已全部建立连接
            $this->_allConnected = (false === array_search(null, $this->_linkPool,true));
        }else{
            //告知是否已全部建立连接
            return $this->_allConnected;
        }
    }
    
    /**
     * 更新/返回服务器宕机时间记录
     * @param int $action 行为指向
     * @return void/bool/array
     */
    public function downTime($action = null) {
        if ($action === 1) {
            //重新写入记录缓存
            //var_dump($this->_linkPool,$this->_serverPool,$this->_downTime);
            $this->_serverStatusCache($this->_downTime);
        }elseif ($action === 2){
            //检测记录缓存中是否有过期需要重新测试连接的服务器
            $retry = false;
            if (!empty($this->_downTime)){
                foreach ($this->_downTime as $host_port=>$downtime){
                    if (($downtime + SERVER_STATUS_CACHE_EXPIRE) < time()){
                        $this->_linkPool[$host_port] = null;
                        $retry = true;
                    }
                }
            }
            return $retry;
        }else{
            //返回记录数组
            return $this->_serverStatusCache();
        }
    }

    /**
     * 取出/缓存服务器宕机记录
     * @param array $data 服务器宕机记录数组
     * @return mixed
     */
    private function _serverStatusCache($data = null) {
        $key = SERVER_STATUS_CACHE_KEY;
        $lifetime = SERVER_STATUS_CACHE_EXPIRE;

        //如果本地redis服务器存在，就使用
        if ($this->_cacheObject instanceof Redis) {
            //从缓存取值
            if ($data === null) {
                return @unserialize($this->_cacheObject->get($key));
            }
            //设置缓存
            return $this->_cacheObject->setex($key, $lifetime, serialize($data));
        } elseif ($this->_cacheObject instanceof SplFileInfo) {
            //从缓存取值
            if ($data === null) {
                // Wrap operations in try/catch to handle notices
                try {
                    // Open file
                    $file = $this->_cacheObject;

                    // If file does not exist
                    if (!$file->isFile()) {
                        // Return default value
                        return false;
                    } else {
                        // Open the file and parse data
                        $created = $file->getMTime();
                        $data = $file->openFile();
                        $lifetime = $data->fgets();

                        // If we're at the EOF at this point, corrupted!
                        if ($data->eof()) {
                            return false;
                            throw new RedisesException(__METHOD__ . ' corrupted cache file!');
                        }

                        if (($created + (int) $lifetime) < time()) {
                            // Delete the file
                            unset($data);
                            unlink($file->getRealPath());
                            return false;
                        }

                        $cache = '';
                        while ($data->eof() === FALSE) {
                            $cache .= $data->fgets();
                        }
                        return unserialize($cache);
                    }
                } catch (ErrorException $e) {
                    // Handle ErrorException caused by failed unserialization
                    if ($e->getCode() === E_NOTICE) {
                        throw new RedisesException(__METHOD__ . ' failed to unserialize cached object with message : ' . $e->getMessage());
                    }

                    // Otherwise throw the exception
                    throw $e;
                }
            }

            //设置缓存
            // Open file to inspect
            $resouce = $this->_cacheObject;
            $file = $resouce->openFile('w');

            try {
                $data = $lifetime . "\n" . serialize($data);
                $file->fwrite($data, strlen($data));
                return (bool) $file->fflush();
            } catch (ErrorException $e) {
                // If serialize through an error exception
                if ($e->getCode() === E_NOTICE) {
                    // Throw a caching error
                    throw new RedisesException(__METHOD__ . ' failed to serialize data for caching with message : ' . $e->getMessage());
                }

                // Else rethrow the error exception
                throw $e;
            }
        }

        return false;
    }

}

/**
 * 动态概率随机
 * 
 * @package Redises
 * @example
 * $rand = new RedisDynamicRandom();
 * $rand->setNum('127.0.0.1:6379',7);
 * $rand->setNum('192.168.0.182:6379',10);
 * $rand->rand();
 * $rand->rand(array('127.0.0.1:6379','192.168.0.182:6379','172.16.0.100:6379'));
 * $rand->resetNum('127.0.0.1:6379');
 */
class RedisDynamicRandom {

    /**
     * 元素数量记录
     * @var array
     */
    private $_elementPool = array(
            //'127.0.0.1:6379'=>7,
            //'192.168.0.182:6379'=>10,
    );

    /**
     * 最近一次的生成的元素排列
     * @var array
     */
    private $_elements = array(
            //'192.168.0.182:6379',
            //'127.0.0.1:6379',
            //'192.168.0.182:6379',
    );

    /**
     * 元素数量统一除数
     * @var int
     * @comment 某个或某些元素数量太大时，为了提高运算效率，对所有元素数量做除法
     */
    private $_divisor = 1;

    /**
     * 设置某个元素的数量记录
     * @param mixed 元素
     * @param int 数量
     * @example 
     * $rand->setNum('127.0.0.1:6379',7);
     */
    public function setNum($element, $num = 1) {
        $this->_elementPool[$element] = (int) $num;
    }

    /**
     * 清除某个元素的数量记录
     * @param mixed 元素
     * @example 
     * $rand->resetNum('127.0.0.1:6379');
     */
    public function resetNum($element) {
        unset($this->_elementPool[$element]);
    }

    /**
     * 返回元素信息
     * @return array
     * @example $rand->dump();
     */
    public function dump() {
        $_elements = array_count_values($this->_elements);
        ksort($_elements);
        return array(
            '_elementPool' => $this->_elementPool,
            '_divisor' => $this->_divisor(),
            '_elements' => $_elements
        );
    }

    /**
     * 得出随机结果
     * @param array 要得出随机结果的元素列表
     * @return mixed 随机出来的元素
     * @example 
     * $rand->rand();//从已经设置了数量记录的元素池中随机
     * or
     * $rand->rand(array('127.0.0.1:6379','192.168.0.182:6379','172.16.0.100:6379'));
     */
    public function rand(array $elements = array()) {
        if (empty($this->_elementPool)) {//如果元素池是空的
            if (empty($elements))
                return false; //如果元素表参数也是空的，返回false

            return $elements[array_rand($elements)]; //直接对元素表参数做随机操作
        }elseif (empty($elements))
        //如果元素表参数是空的，就从已经设置了数量记录的元素池中随机
            $elements = array_keys($this->_elementPool);

        $this->_divisor(); //确定统一除数

        $this->_elements = array();
        foreach ($elements as $element) {
            if (isset($this->_elementPool[$element])) {
                //如果已经设置了数量记录，取数量记录
                $num = $this->_elementPool[$element];
                //数量是0，不做任何操作
                if ($num === 0)
                    continue;
            }else {
                //如果未设置，取记录数组的数量平均数
                isset($avg) or ($avg = array_sum($this->_elementPool) / count($this->_elementPool));
                //如果平均数不是0，取平均数；平均数是0，取1
                $num = $avg ? $avg : 1;
            }
            $num /= $this->_divisor; //统一相除
            //var_dump($num);
            if ($num = floor($num))
                $this->_elements = array_pad($this->_elements, count($this->_elements) + $num, $element);
        }
        if (empty($this->_elements))
        //如果所有元素池中记录的待随机元素的数量都是0，直接对元素表参数做随机操作
            return $elements[array_rand($elements)];
        shuffle($this->_elements); //打乱数组
        //随机运算
        $result = $this->_elements[array_rand($this->_elements)];
        //var_dump($result);
        return $result;
    }

    /**
     * 根据最大数量的元素确定统一除数
     */
    private function _divisor() {
        if (empty($this->_elementPool))
            return 1;
        $max = max($this->_elementPool);
        while ($max / $this->_divisor > 100) {
            $this->_divisor *= 100;
        }
        return $this->_divisor;
    }

}

/**
 * 一致性哈希
 * 
 * @package Redises
 * @example
 * $hash = new RedisConsistentHashing();
 * $hash->addNode('127.0.0.1:6379');
 * $hash->getNode('keyName',2);
 * $hash->getOneNode('keyName');
 * $hash->removeNode('127.0.0.1:6379');
 */
class RedisConsistentHashing {

    /**
     * The number of positions to hash each node to.
     *
     * @var int
     * @comment 虚拟节点数,解决节点分布不均的问题
     */
    private $_replicas = 64;

    /**
     * Internal counter for current number of nodes.
     * @var int
     * @comment 节点记数器
     */
    private $_nodeCount = 0;

    /**
     * Internal map of positions (hash outputs) to nodes
     * @var array { position => node, ... }
     * @comment 位置对应节点,用于lookup中根据位置确定要访问的节点
     */
    private $_positionToNode = array();

    /**
     * Internal map of nodes to lists of positions that node is hashed to.
     * @var array { node => [ position, position, ... ], ... }
     * @comment 节点对应位置,用于删除节点
     */
    private $_nodeToPositions = array();

    /**
     * Whether the internal map of positions to nodes is already sorted.
     * @var boolean
     * @comment 是否已排序
     */
    private $_positionToNodeSorted = false;

    /**
     * Add a node.
     * @param string $node
     * @chainable
     * @comment 添加节点,根据虚拟节点数,将节点分布到多个虚拟位置上
     */
    public function addNode($node) {
        if (isset($this->_nodeToPositions[$node])) {
            throw new RedisesException("Node '$node' already exists.");
            return $this;
        }

        $this->_nodeToPositions[$node] = array();

        // hash the node into multiple positions
        for ($i = 0; $i < $this->_replicas; $i++) {
            $position = crc32($node . $i);
            $this->_positionToNode[$position] = $node; // lookup
            //$this->_nodeToPositions[$node] []= $position; // node removal
        }

        $this->_positionToNodeSorted = false;
        $this->_nodeCount++;

        return $this;
    }

    /**
     * Remove a node.
     * @param string $node
     * @chainable
     */
    public function removeNode($node) {
        if (!isset($this->_nodeToPositions[$node])) {
            throw new RedisesException("Node '$node' does not exist.");
            return $this;
        }

        //foreach ($this->_nodeToPositions[$node] as $position) {
        $_nodeToPositions = array_keys($this->_positionToNode, $node);
        foreach ($_nodeToPositions as $position) {
            unset($this->_positionToNode[$position]);
        }

        unset($this->_nodeToPositions[$node]);

        $this->_nodeCount--;

        return $this;
    }

    public function getOneNode($key) {
        $node = $this->getNode($key, 1);
        if (empty($node)) {
            throw new RedisesException('No targets exist');
            return false;
        }
        return $node[0];
    }

    /**
     * Get a list of nodes for the resource, in order of precedence.
     * Up to $requestedCount nodes are returned, less if there are fewer in total.
     *
     * @param string $key
     * @param int $requestedCount The length of the list to return
     * @return array List of nodes
     * @comment 查找当前的资源对应的节点,
     *          节点为空则返回空,节点只有一个则返回该节点,
     *          对当前资源进行hash,对所有的位置进行排序,在有序的位置列上寻找当前资源的位置
     *          当全部没有找到的时候,将资源的位置确定为有序位置的第一个(形成一个环)
     *          返回所找到的节点
     */
    public function getNode($key, $requestedCount = 1) {
        if (!$requestedCount) {
            return array();
        }

        // handle no nodes
        if (empty($this->_positionToNode)) {
            return array();
        }

        // optimize single node
        if ($this->_nodeCount == 1) {
            return array_unique(array_values($this->_positionToNode));
        }

        // hash resource to a position
        $keyPosition = crc32($key);

        $results = array();
        $collect = false;

        $this->_sortPositionNodes();

        // search values above the resourcePosition
        foreach ($this->_positionToNode as $key => $value) {
            // start collecting nodes after passing resource position, only collect the first instance of any node
            if (($key > $keyPosition) && !in_array($value, $results)) {
                $results [] = $value;

                // return when enough results, or list exhausted
                if ((count($results) == $requestedCount) || (count($results) == $this->_nodeCount)) {
                    return $results;
                }
            }
        }

        // loop to start - search values below the resourcePosition
        foreach ($this->_positionToNode as $key => $value) {
            if (!in_array($value, $results)) {
                $results [] = $value;

                // return when enough results, or list exhausted
                if ((count($results) == $requestedCount) || (count($results) == $this->_nodeCount)) {
                    return $results;
                }
            }
        }

        // return results after iterating through both "parts"
        return $results;
    }

    /**
     * Sorts the internal mapping (positions to nodes) by position
     */
    private function _sortPositionNodes() {
        // sort by key (position) if not already
        if (!$this->_positionToNodeSorted) {
            ksort($this->_positionToNode, SORT_REGULAR);
            $this->_positionToNodeSorted = true;
        }
    }

}

/**
 * 分布式存储的redis队列
 *
 * @package Redises
 * @example
 * $redis = new RedisQ();
 * $redis->push('list1', 111);
 * $msg = $redis->pop('list1');
 */
class RedisQ extends Redises {

    /**
     * 队列长度记录
     * @var array
     */
    private $_llen = array(
        //'127.0.0.1:6379'=>7,
    );

    /**
     * 弹出消息条数记录
     * @var array
     */
    private $_pop = array(
        //'127.0.0.1:6379'=>7,
    );
    
    /**
     * 最后一条非空的消息
     * @var string
     */
    private $_lastMsg;
    
    public function __construct() {
        parent::__construct();

        //随机分布模式
        $this->mode = REDIS_RAND;
    }

    public function __destruct() {
        parent::__destruct();
    }

    /**
     * 插入队列头
     * @param string $key 队列名
     * @param mixed $value 待插入元素
     * @return bool
     * @example 
     * $redis->push('key123', 111);
     */
    public function push($key, $value) {
        return $this->rpush($key, $value);
    }

    /**
     * 弹出队列尾
     * @param string $key 队列名
     * @return string
     * @example 
     * $msg = $redis->pop('key123');
     */
    public function pop($key, $timeout = 0) {
        //if (!$msg = $this->brpop($key,$timeout)){
        if (!$msg = $this->lpop($key)) {
            $this->_llen[$this->_lastHost] = 0;
            return false;
        }
        //记录队列长度
        $this->_llen[$this->_lastHost] = (int) $this->_lastLink->llen($key);
        //记录弹出消息条数
        if (isset($this->_pop[$this->_lastHost]))
            $this->_pop[$this->_lastHost]++;
        else
            $this->_pop[$this->_lastHost] = 1;
        //记录最后一条非空的消息
        $this->_lastMsg = $msg;
        return $this->_lastMsg;
    }

    //从队列左侧插入元素
    public function lpush($key, $value) {
        $llen = $this->randRedis()->lPush($key, $value);
        $this->_llen[$this->_lastHost] = (int) $llen;
        return $llen;
    }

    //从队列右侧插入元素
    public function rpush($key, $value) {
        $llen =  $this->randRedis()->rPush($key, $value);
        $this->_llen[$this->_lastHost] = (int) $llen;
        return $llen;
    }

    /**
     * 阻塞式(blocking)弹出右侧队列尾
     * @param string $key 队列名
     * @param int $timeout 等待超时时间
     * @comment 当给定列表内没有任何元素可供弹出的时候，连接将被 BRPOP 命令阻塞，直到等待超时或发现可弹出元素为止。
     * @return array
     * @example 
     * $msg = $redis->brpop('key123',0);
     * or
     * $msg = $redis->brpop('key123',3);
     * 
     * $msg = $msg[1];
     */
    public function brpop($key, $timeout) {
        return $this->getRedis($key)->brPop($key, $timeout);
    }

    //阻塞式(blocking)弹出左侧队列尾
    public function blpop($key, $timeout) {
        return $this->getRedis($key)->blPop($key, $timeout);
    }

    //弹出右侧队列尾
    public function rpop($key) {
        return $this->getRedis($key)->rPop($key);
    }

    //弹出左侧队列尾
    public function lpop($key) {
        return $this->getRedis($key)->lPop($key);
    }

    /**
     * 返回最后一条非空的消息
     * @return string/null
     * @example 
     * $redis->lastmsg();
     */
    public function lastmsg() {
        return $this->_lastMsg;
    }
    
    /**
     * 返回列表key的长度
     * @param string $key 队列名
     * @return int
     * @example 
     * $llen = $redis->llen('key123');
     */
    public function llen($key) {
        $llen = $this->getRedis($key)->llen($key);
        $this->_llen[$this->_lastHost] = $llen;
        return $llen;
    }
    
    /**
     * 返回所有服务器队列长度记录
     * @return array
     * array (
     *  '192.168.0.180:6379' => 7,
     *  '192.168.0.181:6379' => 0,
     *  '192.168.0.182:6379' => 69,
     * )
     * @example $redis->dumpllen();
     */
    public function dumpllen(){
        return $this->_llen;
    }
    
    /**
     * 返回所有服务器队列长度总和
     * @return int
     * @example $redis->sumllen();
     */
    
    public function sumllen(){
        return array_sum($this->_llen);
    }
    
     /**
     * 返回所有服务器弹出消息条数记录
     * @return array
     * array (
     *  '192.168.0.180:6379' => 7,
     *  '192.168.0.181:6379' => 0,
     *  '192.168.0.182:6379' => 69,
     * )
     * @example $redis->dumpllen();
     */
    public function dumppop(){
        return $this->_pop;
    }

    protected function getRedis($key) {
        return $this->randRedis($this->_llen);
    }

}

/**
 * 哈希分布的redis值存储，支持数组序列化
 *
 * @package Redises
 * @example
 * $redis = new RedisX();
 * $redis->set('key1', 111);
 * $redis->set('key2', array(111));
 * $value = $redis->get('key1');
 */
class RedisX extends Redises {

    const sSign = '[:s:]';//serialize sign
    
    private static $sSignLen;
    
    public function __construct() {
        parent::__construct();
        self::$sSignLen = strlen(self::sSign);
    }

    public function __destruct() {
        parent::__destruct();
    }

    /**
     * 存值
     * @param string $key key名
     * @param mixed $value key值
     * @return bool
     * @example 
     * $redis->set('key123', 111);
     */
    public function set($key, $value, $timeout = 3600) {
        $redis = $this->hashRedis($key);
        $value = $this->_serialize($value);
        if ($timeout)
            return $redis->setex($key, $timeout, $value);
        else
            return $redis->set($key, $value);
    }

    /**
     * 取值
     * @param string $key key名
     * @return mixed
     * @example 
     * $redis->get('key123');
     */
    public function get($key) {
        return $this->_unserialize($this->hashRedis($key)->get($key));
    }

    /**
     * 将一个或多个member元素加入到集合key当中，已经存在于集合的member元素将被忽略。
     * 假如key不存在，则创建一个只包含member元素作成员的集合。
     * 当key不是集合类型时，返回一个错误
     * @param string $key key名
     * @return bool
     * @example 
     * $redis->sAdd('key1234', 111);
     * or
     * $redis->sAdd('key1234', array(111));
     */
    public function sAdd($key, $value) {
        return $this->hashRedis($key)->sAdd($key, $this->_serialize($value));
    }

    /**
     * 返回集合key中的所有成员
     * @param string $key key名
     * @return array 集合中的所有成员
     * @example 
     * $arr = $redis->sMembers('key1234');
     */
    public function sMembers($key) {
        $redis = $this->hashRedis($key);
        $sMembers = $redis->sMembers($key);
        foreach ($sMembers as $key => $value) {
            $sMembers[$key] = $this->_unserialize($value);
        }
        return $sMembers;
    }

     /**
     * 将哈希表key中的域field的值设为value。
     * 如果key不存在，一个新的哈希表被创建并进行HSET操作。
     * 如果域field已经存在于哈希表中，旧值将被覆盖。
     * @param string $key key名
     * @param string $field 域名
     * @param mixed $data 值
     * @return int
     * 如果field是哈希表中的一个新建域，并且值设置成功，返回1。
     * 如果哈希表中域field已经存在且旧值已被新值覆盖，返回0。
     * @example 
     * $redis->hSet('key1234', 111);
     * or
     * $redis->hSet('key1234', array(111));
     */
    public function hSet($key, $field, $data, $lifetime = 0){
        $redis = $this->hashRedis($key);
        $lifetime = $lifetime?time()+$lifetime:0;
        $data = "$lifetime\n".$this->_serialize($data);
        return $redis->hset($key, $field, $data);
    }
    
    /**
     * 返回哈希表key中给定域field的值。
     * @param string $key key名
     * @param string $field 域名
     * @return mixed 给定域的值。当给定域不存在或是给定key不存在时，返回nil。
     * @example 
     * $arr = $redis->hGet('key1234');
     */
    public function hGet($key, $field) {
        $redis = $this->hashRedis($key);
        if ($data = $redis->hget($key, $field))
            if (($data = $this->_activity($data)) === false)
                $redis->hdel($key, $field);//如果已经过期，就删除
        return $this->_unserialize($data);
    }
    
    //储值序列化
    private function _serialize($value) {
        if (is_array($value) || is_object($value))
            return self::sSign . serialize($value);
        return $value;
    }

    //取值反序列化
    private function _unserialize($str) {
        if ($str && (strpos($str,self::sSign) === 0)){
            $str = substr($str,self::$sSignLen);
            return unserialize($str);
        }else
            return $str;
    }

    //判断是否超时
    private function _activity($data){
        if ($data && (($nl_pos = strpos($data,"\n")) > 0)){
            $lifetime = (int)substr($data,0,$nl_pos);
            if ($lifetime && ($lifetime < time()))
                return false;
            return substr($data,$nl_pos+1);
        }else
            return $data;
    }
}

/**
 * 错误捕抓
 * 
 * @package Redises
 * @example
 * throw new RedisesException("$error");
 */
class RedisesException extends Exception {

    public function __construct($message) {
        parent::__construct($message);
        if (!REDISES_DEBUG)
            set_exception_handler(array($this, 'set_exception_handler'));
    }

    public function __destruct() {
        if (!REDISES_DEBUG)
            restore_exception_handler();
    }

    //Redises错误处理句柄
    public function set_exception_handler(Exception $e) {
        //$e->getMessage();
    }

}

/**
 * 一个空类，对该类的实例做任何操作都只返回false  
 * 
 * @package Redises
 * @example
 * $redis = new void();
 * $r = $redis->set('key',123);
 * $r == false;
 */
class void {

    public function __call($name, $arguments) {
        return false;
    }

    public function __callStatic($name, $arguments) {
        return false;
    }

    public function __set($name, $value) {
        return false;
    }

    public function __get($name) {
        return false;
    }

}